Use Pulsar Go client | 您所在的位置:网站首页 › pulsar producer flush › Use Pulsar Go client |
Use Pulsar Go clientCreate a producer You can configure Go producers using a ProducerOptions object. Here's an example: producer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "my-topic",})if err != nil { log.Fatal(err)}_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ Payload: []byte("hello"),})defer producer.Close()if err != nil { fmt.Println("Failed to publish message", err)}fmt.Println("Published message")For all available methods of Producer interface, see here. MonitorPulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP. Write a simple producer application.// Create a Pulsar clientclient, err := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650",})if err != nil { log.Fatal(err)}defer client.Close()// Start a separate goroutine for Prometheus metrics// In this case, Prometheus metrics can be accessed via http://localhost:2112/metricsgo func() { prometheusPort := 2112 log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort) http.Handle("/metrics", promhttp.Handler()) err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil) if err != nil { log.Fatal(err) }}()// Create a producerproducer, err := client.CreateProducer(pulsar.ProducerOptions{ Topic: "topic-1",})if err != nil { log.Fatal(err)}defer producer.Close()ctx := context.Background()// Write your business logic here// In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/producewebPort := 8082http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) { msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte(fmt.Sprintf("hello world")), }) if err != nil { log.Fatal(err) } else { log.Printf("Published message: %v", msgId) fmt.Fprintf(w, "Published message: %v", msgId) }})err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)if err != nil { log.Fatal(err)}To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).scrape_configs:- job_name: pulsar-client-go-metrics scrape_interval: 10s static_configs: - targets: - localhost:2112Create a consumerPulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here's a basic example that uses channels: consumer, err := client.Subscribe(pulsar.ConsumerOptions{ Topic: "topic-1", SubscriptionName: "my-sub", Type: pulsar.Shared,})if err != nil { log.Fatal(err)}defer consumer.Close()for i := 0; i |
CopyRight 2018-2019 实验室设备网 版权所有 |